package com.hujf.flink.ch05;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @Author hujf
 * @Date 2022/5/23 11:51
 * @Description TODO
 */
public class MySource implements SourceFunction<String> {

    private boolean isRunning = true;
    @Override
    public void run(SourceContext ctx) throws Exception {
        while(isRunning){
            List<String> stringList = new ArrayList<>();
            stringList.add("world");
            stringList.add("flink");
            stringList.add("hello");
            stringList.add("my");
            stringList.add("name");
            stringList.add("hujd");

            int size = stringList.size();
            int i= new Random().nextInt(size);

            ctx.collect(stringList.get(i));

            System.out.println("source :"+stringList.get(i));
            int rt = i*1000;
            System.out.println("延迟时间： "+rt);
            Thread.sleep(rt);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
